package com.starzy.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;

import java.io.IOException;

/**
 * @Author: starzy https://www.cnblogs.com/starzy
 * @Description: 单词全局排序编号
 * @Date: Created in 10:54 2020/12/14
 */
public class WordSortMR {

    enum WordCountCounter{
        WORD_COUNT
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
        Job job = Job.getInstance(conf);
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setNumReduceTasks(3);

        job.setJarByClass(WordSortMR.class);
        job.setMapperClass(WordSortMapper.class);
        job.setReducerClass(WordSortReducer.class);

        //设置mapper类的输出key和value的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //设置reducer类的输出key和value的类型
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        //指定处理数据所在目录
        Path inputPath = new Path("D:/data/words/input");
        FileInputFormat.setInputPaths(job, inputPath);
        Path outputPath = new Path("D:/data/words/output");
        FileOutputFormat.setOutputPath(job, outputPath);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }

        //job.setNumReduceTasks(3);
        // 设置分区文件，即采样后放在的文件的文件名，不是完整路径
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path("file:///D:/data/words/partition/par.lst"));
        //采样器：三个参数
        /* 第一个参数 freq: 表示来一个样本，将其作为采样点的概率。如果样本数目很大
         *第二个参数 numSamples：表示采样点最大数目为，我这里设置10代表我的采样点最大为10，如果超过10，那么每次有新的采样点生成时
         * ，会删除原有的一个采样点,此参数大数据的时候尽量设置多一些
         * 第三个参数 maxSplitSampled：表示的是最大的分区数：我这里设置100不会起作用，因为我设置的分区只有4个而已
         */

        InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.01, 10, 100);

        //把分区文件放在hdfs上，对程序没什么效果，方便我们查看而已
        //FileInputFormat.addInputPath(job, new Path("/test/sort"));
        //将采样点写入到分区文件中，这个必须要
        InputSampler.writePartitionFile(job, sampler);

       // //InputSampler.Sampler<Text,Text> sampler= new InputSampler.RandomSampler<Text, Text>(0.01, 3, 10);
        //TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("file:///D:/data/words/partition/par.lst"));
        //job.setPartitionerClass(TotalOrderPartitioner.class);
        ////将sample数据写入分区文件中
        //InputSampler.writePartitionFile(job, sampler);

        boolean flag = job.waitForCompletion(true);
        System.exit(flag?0:1);
    }

    static class WordSortMapper extends Mapper<Text,Text,Text, Text>{
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {

            String[]words = value.toString().split(" ");
            for (String word : words) {
                System.out.println(word);
                context.write(new Text(word),new Text(word));
            }
        }
    }

    static class WordSortReducer extends Reducer<Text,Text,LongWritable,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Counter counter = context.getCounter(WordCountCounter.WORD_COUNT);
            counter.increment(1L);
            context.write(new LongWritable(counter.getValue()),key);
        }
    }

}
